Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

athena iceberg #659

Merged
merged 38 commits into from
Oct 16, 2023
Merged

athena iceberg #659

merged 38 commits into from
Oct 16, 2023

Conversation

sh-rp
Copy link
Collaborator

@sh-rp sh-rp commented Sep 28, 2023

Description

Implements iceberg tables on Athena.

@netlify
Copy link

netlify bot commented Sep 28, 2023

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 918c4d5
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/652d7b202aa37300084dc783

@sh-rp sh-rp force-pushed the d#/athena-iceberg branch from bd5527b to 29a6d06 Compare October 4, 2023 10:49

if insert_sql.strip()[-1] != ";":
insert_sql += ";"
sql.append(insert_sql)
# -- DELETE FROM {staging_table_name} WHERE 1=1;

# clean up
if insert_temp_table_name:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iceberg does not support temp tables

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should happen only in athena. if we need a cleanup method that gets implemented in a subclass then let's do that.

@@ -93,17 +100,17 @@ def gen_key_table_clauses(cls, root_table_name: str, staging_root_table_name: st

A list of clauses may be returned for engines that do not support OR in subqueries. Like BigQuery
"""
return [f"FROM {root_table_name} as d WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} as s WHERE {' OR '.join([c.format(d='d',s='s') for c in key_clauses])})"]
return [f"FROM {root_table_name} WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} as s WHERE {' OR '.join([c.format(d=root_table_name,s='s') for c in key_clauses])})"]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

icerberg does not support table aliases on the delete statement. I tried a few suggestions I found on stackoverflow (for other sql dialects), but none worked.

Copy link
Collaborator

@rudolfix rudolfix Oct 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK this will conflict with other destinations that do not work with non aliased tables. this is getting into spaghetti code mode. btw. I'm totally OK with what you did here - if other destinations still work.

EDIT: seems it was bigquery. see the CI

google.api_core.exceptions.BadRequest: 400 Query error: Unrecognized name: `chat-analytics-rasa-ci` at [1:64]

it can't work with non aliased tables in DELETE :/ I remember that because I spent a day going from the code you did above into aliased one

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of good stuff (ie. how we create followup jobs is generalized, some method renames connected to that 👍 )

regarding the merge: if we are getting into something really complicated let's drop it from this ticket (or altogether). I had no idea that there are no temp tables on Iceberg. we may need a fully specialized merge job for it

some other things look hacky to me ie. that we make Athena connector aware of working on staging dataset, cutting staging from the name etc.

maybe we should do something different. what about this:

  • iceberg tables must be marked as such in table schema (ie. table_format hint) so all the components in dlt know and can react to that. btw. typically only 1-2 tables should be iceberg, not all (ie those that have GDPR data)
  • having that we change how filesystem works. if it sees iceberg table, it always does replace on such table so we keep only the most recent load id (filesystem knows if it is used as staging so it can do that selectively)
  • the athena destination creates all iceberg tables in staging dataset and generates copy/merge jobs to move data to iceberg tables. it has only the new data in the stage (due to replace on iceberg tables). we can also use load_id to selectively copy data but then the staging dataset will grow all the time

this will require a refactor but I think we'll get a simpler cleaner code

maybe if we make

def get_load_table(schema: Schema, file_name: str) -> TTableSchema:

a method of the JobClient then the job client will produce a transient table schema with write dispotion and table format properly set and possibly other settings. it may also get additional flag if the table is destined to staging dataset or not

also if we replace

job_client.get_stage_dispositions():

with a more selective method - if it gets table schema it decides that it goes to staging dataset or not. then we may just move this logic out of load.py altogehter

def to_db_integer_type(self, precision: Optional[int]) -> str:
if precision is None:
return "bigint"
# iceberg does not support smallint and tinyint
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: TIMESTAMP is precision 6 on iceberg, 3 on parquet

@@ -93,17 +100,17 @@ def gen_key_table_clauses(cls, root_table_name: str, staging_root_table_name: st

A list of clauses may be returned for engines that do not support OR in subqueries. Like BigQuery
"""
return [f"FROM {root_table_name} as d WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} as s WHERE {' OR '.join([c.format(d='d',s='s') for c in key_clauses])})"]
return [f"FROM {root_table_name} WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} as s WHERE {' OR '.join([c.format(d=root_table_name,s='s') for c in key_clauses])})"]
Copy link
Collaborator

@rudolfix rudolfix Oct 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK this will conflict with other destinations that do not work with non aliased tables. this is getting into spaghetti code mode. btw. I'm totally OK with what you did here - if other destinations still work.

EDIT: seems it was bigquery. see the CI

google.api_core.exceptions.BadRequest: 400 Query error: Unrecognized name: `chat-analytics-rasa-ci` at [1:64]

it can't work with non aliased tables in DELETE :/ I remember that because I spent a day going from the code you did above into aliased one

@@ -136,7 +143,7 @@ def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str:
Returns:
sql statement that inserts data from selects into temp table
"""
return f"CREATE TEMP TABLE {temp_table_name} AS {select_sql};"
return f"CREATE TABLE {temp_table_name} AS {select_sql};"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should happen only in Athena? we should use temp tables everywhere it is possible. If it is really very complicated we can give up on merge in this ticket


if insert_sql.strip()[-1] != ";":
insert_sql += ";"
sql.append(insert_sql)
# -- DELETE FROM {staging_table_name} WHERE 1=1;

# clean up
if insert_temp_table_name:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should happen only in athena. if we need a cleanup method that gets implemented in a subclass then let's do that.

sql: List[str] = []

# for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries
is_iceberg = self.schema.tables[table_name].get("write_disposition", None) == "skip"
# or if we are in iceberg mode, we create iceberg tables for all tables
is_iceberg = (self.schema.tables[table_name].get("write_disposition", None) == "skip") or (self._is_iceberg_table(self.schema.tables[table_name]) and not self.in_staging_mode)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this "in staging mode" is still here unfortunately.. i don't know how to do it any other way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you move the get_load_table to JobClient you can modify table format to keep iceberg (destination dataset) or not (staging dataset). OFC you need to pass is_staging flag to the method. I do not like it but probably it is a good compromise

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed: use get_load_table() to

  • adjust the precision
  • set the table format
    and pass it to the code below


def table_needs_staging(self, table: TTableSchema) -> bool:
# not so nice, how to do it better, collect this info from the main destination as before?
if table["table_format"] == "iceberg":
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is hardcoded in here now, i think this should be configured in the main destination as I did before...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather say that this table needs truncation! We do not need to keep it in staging dataset, we just need to truncate it.
To me it makes sense... if you have iceberg for sure you need to copy/move data and keeping full history does not makes sense...

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like how load.py works now. And I think we can way simplify the implementation by moving get_load_table into the JobClient.

@@ -69,13 +69,18 @@ class AthenaTypeMapper(TypeMapper):
"int": "bigint",
}

def __init__(self, capabilities: DestinationCapabilitiesContext, iceberg_mode: bool):
super().__init__(capabilities)
self.iceberg_mode = iceberg_mode
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we need iceberg_mode you just set it up per table

@@ -525,6 +536,18 @@ def get_top_level_table(tables: TSchemaTables, table_name: str) -> TTableSchema:
return get_top_level_table(tables, parent)
return table

def get_load_table(tables: TSchemaTables, table_name: str) -> TTableSchema:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be IMO part of JobClient

if precision <= 8:
return "tinyint"
return "int" if self.iceberg_mode else "tinyint"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's why JobClient should create/modify table schema. so you can modify precision there and do not hack the type mapper...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be the cleanest to have a subclass for iceberg and then set that before the table sql is generated? I don't feel like changing the type mapper is hacking at all, that is what it is there for, changing the mapping of the types depending on database / table format you are storing into.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we could extend the type mapper to have the info which table_format is currently being processed. that might be nice?

sql: List[str] = []

# for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries
is_iceberg = self.schema.tables[table_name].get("write_disposition", None) == "skip"
# or if we are in iceberg mode, we create iceberg tables for all tables
is_iceberg = (self.schema.tables[table_name].get("write_disposition", None) == "skip") or (self._is_iceberg_table(self.schema.tables[table_name]) and not self.in_staging_mode)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you move the get_load_table to JobClient you can modify table format to keep iceberg (destination dataset) or not (staging dataset). OFC you need to pass is_staging flag to the method. I do not like it but probably it is a good compromise

dlt/destinations/filesystem/filesystem.py Show resolved Hide resolved

def table_needs_staging(self, table: TTableSchema) -> bool:
# not so nice, how to do it better, collect this info from the main destination as before?
if table["table_format"] == "iceberg":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather say that this table needs truncation! We do not need to keep it in staging dataset, we just need to truncate it.
To me it makes sense... if you have iceberg for sure you need to copy/move data and keeping full history does not makes sense...

dlt/destinations/job_client_impl.py Show resolved Hide resolved
finally:
self.in_staging_mode = False

def table_needs_staging(self, table: TTableSchema) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

def get_load_table(tables: TSchemaTables, table_name: str) -> TTableSchema:
try:
# make a copy of the schema so modifications do not affect the original document
table = copy(tables[table_name])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll need a deepcopy because we will modify columns

@sh-rp sh-rp force-pushed the d#/athena-iceberg branch from 3d0c4ed to 122d035 Compare October 11, 2023 14:55
@@ -293,11 +303,12 @@ def __init__(self, schema: Schema, config: AthenaClientConfiguration) -> None:
super().__init__(schema, config, sql_client)
self.sql_client: AthenaSQLClient = sql_client # type: ignore
self.config: AthenaClientConfiguration = config
self.type_mapper = AthenaTypeMapper(self.capabilities)
self.type_mapper = AthenaTypeMapper(self.capabilities, True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some tables are iceberg, some not. you can't set always True here, see below

sql: List[str] = []

# for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries
is_iceberg = self.schema.tables[table_name].get("write_disposition", None) == "skip"
# or if we are in iceberg mode, we create iceberg tables for all tables
is_iceberg = (self.schema.tables[table_name].get("write_disposition", None) == "skip") or (self._is_iceberg_table(self.schema.tables[table_name]) and not self.in_staging_mode)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed: use get_load_table() to

  • adjust the precision
  • set the table format
    and pass it to the code below

columns = ", ".join([self._get_column_def_sql(c) for c in new_columns])

# this will fail if the table prefix is not properly defined
table_prefix = self.table_prefix_layout.format(table_name=table_name)
location = f"{bucket}/{dataset}/{table_prefix}"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing (maybe helpful) when we create parquet file we are setting precision of various fields ie datetime. make sure we do not have problems here (probably you should take into account both capabilities and table format but I think our current implementation is good - I hope it works)


def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema:
table = super().get_load_table(table_name, staging)
# if staging and table.get("table_format", None) == "iceberg":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh I think you need to uncomment that and also modify the precision

dlt/destinations/athena/configuration.py Show resolved Hide resolved
dlt/destinations/filesystem/configuration.py Outdated Show resolved Hide resolved
def with_staging_dataset(self) -> Iterator["FilesystemClient"]:
current_dataset_path = self.dataset_path
try:
self.dataset_path = posixpath.join(self.fs_path, self.config.normalize_dataset_name(self.schema)) + "_staging"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.config.normalize_dataset_name(SqlClientBase.make_staging_dataset_name(self.schema))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure you want to use a method from the sqlclientbase in the filesystem destination? this does not seem quite right..

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we need to move it to client base? or to WithStagingDataset?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sh-rp we need to fix this one. at least do not add _staging to normalized string

dlt/destinations/sql_jobs.py Show resolved Hide resolved
@sh-rp sh-rp force-pushed the d#/athena-iceberg branch from 701f18a to ad8dc9b Compare October 12, 2023 17:08

@abstractmethod
def with_staging_dataset(self)-> ContextManager["JobClientBase"]:
"""Executes job client methods on staging dataset"""
return self # type: ignore

class SupportsStagingDestination():
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this the main destination can control the behavior of the staging destination, this was kind of undefined before, so we were lucky that athena worked properly, but now it is well defined and we will also need this on other datalakes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good! you could use that in tests to generate test configs automatically, no?

# Override in subclass if db supports other integer types (e.g. smallint, integer, tinyint, etc.)
return self.sct_to_unbound_dbt["bigint"]

def to_db_type(self, column: TColumnSchema) -> str:
def to_db_type(self, column: TColumnSchema, table_format: TTableFormat = None) -> str:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to me manipulating the load table to achieve the correct mapping feels hacky. I would say that the type mapper should be in charge of mapping schema types to db types and in the case of athena (and maybe others in the future) it has to take the chosen table format into account.

@sh-rp
Copy link
Collaborator Author

sh-rp commented Oct 14, 2023

Notes:

  • i have disabled the dbt tests for iceberg athena for now. On the dbt athena community plugin page it says iceberg should be supported with some caveats, so this would need more work to be resolved.
  • I think it should be fairly easy to get merge to work, but we can do that in the next step
  • We should save the iceberg table_format into the schema when the env var is set, but I am not sure where in the code this should be done so that this data is persisted. Do we have any destination that changes the schema in our code and does this concept make sense?

@sh-rp sh-rp force-pushed the d#/athena-iceberg branch from f13c14f to ba0c593 Compare October 14, 2023 09:23
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very good now!

  • see one comment regarding `"_staging" (we must normalize the whole name)
  • dbt problems: it is clear from the error that we are copying timestamps from iceberg table (6) to regular table (precision 3). and this fails
10:05:28  NOT_SUPPORTED: Incorrect timestamp precision for timestamp(6); the configured precision is MILLISECONDS; column name: end_time. You may need to manually clean the data at location 's3://dlt-athena-output/tables/850461bf-aa78-4d8c-8be7-36c1630da099' before retrying. Athena will not delete data in your account.

my take: disable dbt chess tests on athena + iceberg and leave just jaffle shop - just mention the error in docs maybe
or you mess around with table_type='iceberg', in the materialization in chess

def with_staging_dataset(self) -> Iterator["FilesystemClient"]:
current_dataset_path = self.dataset_path
try:
self.dataset_path = posixpath.join(self.fs_path, self.config.normalize_dataset_name(self.schema)) + "_staging"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sh-rp we need to fix this one. at least do not add _staging to normalized string


@abstractmethod
def with_staging_dataset(self)-> ContextManager["JobClientBase"]:
"""Executes job client methods on staging dataset"""
return self # type: ignore

class SupportsStagingDestination():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good! you could use that in tests to generate test configs automatically, no?

@rudolfix rudolfix marked this pull request as ready for review October 16, 2023 18:04
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@rudolfix rudolfix merged commit 87b210b into devel Oct 16, 2023
33 of 36 checks passed
@rudolfix rudolfix deleted the d#/athena-iceberg branch October 16, 2023 18:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants